package xiaoa.java.spider.load;

import java.util.ArrayList;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.bson.types.ObjectId;

import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Sorts;

import xiaoa.java.log.L;
import xiaoa.java.mongoDB.DbMgr;
import xiaoa.java.spider.db.vo.FetchUrl;

/**
 * 读取
 * @author xiaoa
 * @date 2017年10月7日 下午3:15:20
 * @version V1.0
 *
 */
public class LoadRun<B extends FetchUrl> implements Runnable {
	 
	BlockingQueue<B> queue = null;
	
	protected Class<B> bc = null;
	
	 public LoadRun( BlockingQueue<B> queue , Class<B> bc) {
		 this.queue = queue;
		 this.bc = bc;
	}
	
	
	@Override
	public void run() {
		
		Thread.currentThread().setName("load_" + Thread.currentThread().getName());

		
		// 一次加载100个
		while(true){
			try {
				List<B>  list = load();
				
	            L.info("本次加载 ：list.size = " + list.size());
				
				if (list == null || list.isEmpty()){
					Thread.sleep(TimeUnit.SECONDS.toMillis(30));
				}
				
				List<ObjectId> keyList = new ArrayList<>(list.size());
				
				for (B url : list){
					keyList.add(new ObjectId(url.getId()));
				}
				
				// 修改状态
				B  updateUrl = bc.newInstance();
				updateUrl.setState(B.STATE_IN);
				updateUrl.setMessage("处理中");
				
				long  count = DbMgr.updateMany(updateUrl, Filters.in("_id", keyList), bc);
				
				L.info("修改状态 ： count = " + count);
				
				for (B url : list){
					queue.put(url);
				}
				
			} catch (Throwable e) {
				e.printStackTrace();
				try {
					Thread.sleep(TimeUnit.SECONDS.toMillis(10));
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				
			}
			
		}
	}
	
	/**
	 * 读取
	 * @Title: load
	 * @return
	 * @author xiaoa
	 */
	public  List<B> load()throws Throwable{
		return DbMgr.search(bc,Filters.eq("state", B.STATE_WAIT),Sorts.ascending("createTime") ,1, 500);
	}

}
